package com.example.mq.mqclient;

import com.example.mq.common.*;
import com.example.mq.mqserver.core.BasicProperties;
import com.example.mq.mqserver.core.ExchangeType;
import lombok.Data;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

/**
 * Created with IntelliJ IDEA.
 * Description:
 * User: ws
 * Date: 2023-10-06
 * Time: 17:04
 */
/*
 * 表示一个逻辑上的连接
 * 他提供一系列的方法去和服务器的核心 api 对应（客户端提供的方法仅仅是发送特定的请求，调用服务器的方法）
 *
 * 一个客户端可以有多个模块
 * 每个模块都可以和 brokerServer 之间建立”逻辑上的连接“（即 channel）
 * 但是这几个 channel 复用同一个 TCP 连接
 */
@Data
public class Channel {
    private String channelId;
    // 属于哪个 Connection
    private Connection connection;
    // 用来存储后续客户端收到的服务器的响应
    private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>();
    // 如果当前的 channel 订阅了某个队列，就需要在此处记录下对应的回调是啥，当服务器队列的消息返回回来的时候，调用回调
    // 此处约定一个 channel 只能有一个回调
    private Consumer consumer = null;

    public Channel(String channelId, Connection connection) {
        this.channelId = channelId;
        this.connection = connection;
    }

    // 在这个方法中，和服务器进行交互，告知服务器，此处客户端创建了新的 channel 了
    public boolean createChannel() throws IOException {
        // 对于创建 channel 操作来说，payload 就是一个 basicArguments对象
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setChannelId(channelId);
        basicArguments.setRid(generateRid());
        byte[] payload = BinaryTool.toBytes(basicArguments);
        Request request = new Request();
        request.setType(0x1);// 0x1 就是创建 channel
        request.setLength(payload.length);
        request.setPayload(payload);

        // 发送
        connection.writeRequest(request);
        // 等待服务器响应
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }

    // 期望通过这个方法阻塞等待服务器响应
    // 因为请求是按顺序发的，但是响应可能会因为各种原因，不按照预计的顺序返回
    // 先到的响应存放到 basicReturnMap 中，需要时根据 rid，取走
    private BasicReturns waitResult(String rid) {
        BasicReturns basicReturns = null;
        while ((basicReturns = basicReturnsMap.get(rid)) == null) {
            // 如果查询的结果为 null，说明响应还没回来
            // 此时需要阻塞等待
            synchronized (this) {// wait 操作需要加锁
                try {
                    wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        // 读取成功之后，需要将该消息从哈希表中删除
        basicReturnsMap.remove(rid);
        return basicReturns;
    }

    // 使用这个方法唤醒所有的等待响应的线程
    public void putReturns(BasicReturns basicReturns) {
        System.out.println(basicReturns.toString());
        basicReturnsMap.put(basicReturns.getRid(), basicReturns);
        synchronized (this) {
            // 当前也不知道有多少个线程在等待上述的响应
            // 把所有的等待的线程都唤醒
            notifyAll();
        }
    }


    private String generateRid() {
        return "R-" + UUID.randomUUID().toString();
    }

    // 关闭 channel，车服务器发送一个 type = 0x2 的请求
    public boolean close() throws IOException {
        BasicArguments basicArguments = new BasicArguments();
        basicArguments.setRid(generateRid());
        basicArguments.setChannelId(channelId);
        byte[] payload = BinaryTool.toBytes(basicArguments);
        Request request = new Request();
        request.setType(0x2);
        request.setLength(payload.length);
        request.setPayload(payload);
        connection.writeRequest(request);
        BasicReturns basicReturns = waitResult(basicArguments.getRid());
        return basicReturns.isOk();
    }

    // 创建交换机
    public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable,
                                   boolean autoDelete, Map<String, Object> arguments) throws IOException {
        ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments();
        exchangeDeclareArguments.setRid(generateRid());
        exchangeDeclareArguments.setChannelId(channelId);
        exchangeDeclareArguments.setExchangeName(exchangeName);
        exchangeDeclareArguments.setExchangeType(exchangeType);
        exchangeDeclareArguments.setDurable(durable);
        exchangeDeclareArguments.setAutoDelete(autoDelete);
        exchangeDeclareArguments.setArguments(arguments);
        byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments);

        Request request = new Request();
        request.setType(0x3);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);

        BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid());
        return basicReturns.isOk();
    }

    // 删除交换机
    public boolean exchangeDelete(String exchangeName) throws IOException {
        ExchangeDeleteArguments exchangeDeleteArguments = new ExchangeDeleteArguments();
        exchangeDeleteArguments.setRid(generateRid());
        exchangeDeleteArguments.setChannelId(channelId);
        exchangeDeleteArguments.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toBytes(exchangeDeleteArguments);

        Request request = new Request();
        request.setType(0x4);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);

        BasicReturns basicReturns = waitResult(exchangeDeleteArguments.getRid());
        return basicReturns.isOk();
    }

    // 创建队列
    public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete,
                 Map<String, Object> arguments) throws IOException {
        QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments();
        queueDeclareArguments.setRid(generateRid());
        queueDeclareArguments.setChannelId(channelId);
        queueDeclareArguments.setQueueName(queueName);
        queueDeclareArguments.setDurable(durable);
        queueDeclareArguments.setExclusive(exclusive);
        queueDeclareArguments.setAutoDelete(autoDelete);
        queueDeclareArguments.setArguments(arguments);
        byte[] payload = BinaryTool.toBytes(queueDeclareArguments);

        Request request = new Request();
        request.setType(0x5);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);

        BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid());
        return basicReturns.isOk();


    }


    // 删除队列
    public boolean queueDelete(String queueName) throws IOException {
        QueueDeleteArguments queueDeleteArguments = new QueueDeleteArguments();
        queueDeleteArguments.setRid(generateRid());
        queueDeleteArguments.setChannelId(channelId);
        queueDeleteArguments.setQueueName(queueName);
        byte[] payload = BinaryTool.toBytes(queueDeleteArguments);

        Request request = new Request();
        request.setType(0x6);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);

        BasicReturns basicReturns = waitResult(queueDeleteArguments.getRid());
        return basicReturns.isOk();
    }

    // 创建绑定
    public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException {
        QueueBindArguments queueBindArguments = new QueueBindArguments();
        queueBindArguments.setRid(generateRid());
        queueBindArguments.setChannelId(channelId);
        queueBindArguments.setQueueName(queueName);
        queueBindArguments.setExchangeName(exchangeName);
        queueBindArguments.setBindingKey(bindingKey);
        byte[] payload = BinaryTool.toBytes(queueBindArguments);

        Request request = new Request();
        request.setType(0x7);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);

        BasicReturns basicReturns = waitResult(queueBindArguments.getRid());
        return basicReturns.isOk();
    }

    // 解除绑定
    public boolean queueUnbind(String queueName, String exchangeName) throws IOException {
        QueueUnbindArguments queueUnbindArguments = new QueueUnbindArguments();
        queueUnbindArguments.setRid(generateRid());
        queueUnbindArguments.setChannelId(channelId);
        queueUnbindArguments.setQueueName(queueName);
        queueUnbindArguments.setExchangeName(exchangeName);
        byte[] payload = BinaryTool.toBytes(queueUnbindArguments);

        Request request = new Request();
        request.setType(0x8);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);

        BasicReturns basicReturns = waitResult(queueUnbindArguments.getRid());
        return basicReturns.isOk();
    }

    // 发送消息
    public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException {
        BasicPublishArguments basicPublishArguments = new BasicPublishArguments();
        basicPublishArguments.setRid(generateRid());
        basicPublishArguments.setChannelId(channelId);
        basicPublishArguments.setExchangeName(exchangeName);
        basicPublishArguments.setRoutingKey(routingKey);
        basicPublishArguments.setBasicProperties(basicProperties);
        basicPublishArguments.setBody(body);
        byte[] payload = BinaryTool.toBytes(basicPublishArguments);

        Request request = new Request();
        request.setType(0x9);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);

        BasicReturns basicReturns = waitResult(basicPublishArguments.getRid());
        return basicReturns.isOk();
    }


    // 订阅消息
    public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException {
        // 先设置回调
        if (this.consumer != null) {
            throw new MqException("该 channel 已经设置过消费消息的回调了，不能重复设置！");
        }
        this.consumer = consumer;

        System.out.println("#########################");
        System.out.println();
        System.out.println(queueName);
        System.out.println();

        BasicConsumeArguments basicConsumeArguments = new BasicConsumeArguments();
        basicConsumeArguments.setRid(generateRid());
        basicConsumeArguments.setChannelId(channelId);
        basicConsumeArguments.setQueueName(queueName);
        basicConsumeArguments.setAutoAck(autoAck);
        basicConsumeArguments.setConsumerTag(channelId);// 此处的 consumerTag 也是用 channelId 表示
        byte[] payload = BinaryTool.toBytes(basicConsumeArguments);

        Request request = new Request();
        request.setType(0xa);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);

        BasicReturns basicReturns = waitResult(basicConsumeArguments.getRid());
        return basicReturns.isOk();
    }

    // 确认消息
    public boolean basicAck(String queueName, String messageId) throws IOException {
        BasicAckArguments basicAckArguments = new BasicAckArguments();
        basicAckArguments.setRid(generateRid());
        basicAckArguments.setChannelId(channelId);
        basicAckArguments.setQueueName(queueName);
        basicAckArguments.setMessageId(messageId);
        byte[] payload = BinaryTool.toBytes(basicAckArguments);

        Request request = new Request();
        request.setType(0xb);
        request.setLength(payload.length);
        request.setPayload(payload);

        connection.writeRequest(request);

        BasicReturns basicReturns = waitResult(basicAckArguments.getRid());
        return basicReturns.isOk();
    }


}











